﻿using Orleans;
using Orleans.Providers.Streams.Common;
using Orleans.Streams;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace iTool.ClusterComponent
{
    public class SubscribeQueueServiceHandler<T> : IDisposable
    {
        private IGrainFactory grainFactory;
        private IAsyncStream<T> stream;
        protected StreamSubscriptionHandle<T> streamHandle;
        private Guid streamId;
        private string streamNamespace;
        private bool isBroadcast;
        private Func<T, StreamSequenceToken, Task> onNextAsync;

        public SubscribeQueueServiceHandler(IQueueProvider provider, IGrainFactory grainFactory, string topic, string streamNamespace,  bool isBroadcast = true)
        {
            this.streamId = iUtils.ToGuid(topic);
            this.streamNamespace = streamNamespace;
            this.stream = provider.GetStream<T>(this.streamId, streamNamespace);
            this.isBroadcast = isBroadcast;
            this.grainFactory = grainFactory;
        }


        /// <summary>
        /// 开始消费
        /// </summary>
        /// <param name="offset">从指定位置消费，默认从当前开始消费</param>
        /// <returns></returns>
        public async Task StartAsync(Func<T, StreamSequenceToken, Task> onNextAsync, Func<Exception, Task> onErrorAsync, long offset = -1)
        {
            var subscriptionHandles = (await stream.GetAllSubscriptionHandles()) ?? new List<StreamSubscriptionHandle<T>>();
            if (subscriptionHandles.Count > 0) 
            {
                foreach (var item in subscriptionHandles)
                {
                    await item.ResumeAsync(OnNextAsync);
                }
            }

            if (offset < 0 || this.isBroadcast)
            {
                this.streamHandle = await stream.SubscribeAsync(onNextAsync, onErrorAsync);
            }
            else
            {
                this.onNextAsync = onNextAsync;
                this.streamHandle = await stream.SubscribeAsync<T>(this.OnNextAsync, onErrorAsync, new EventSequenceToken(offset));
            }
            

            //var queueOffset = this.iClient.GetGrain<IStreamQueueOffset>(this.streamId, this.streamNamespace);
            if (offset > 0)
            {
                var queueOffset = this.grainFactory.GetGrain<IStreamQueueOffset>(Guid.Empty, "Aggregation");
                await queueOffset.ReSetOffset(offset);
            }
        }

        private async Task OnNextAsync(T message, StreamSequenceToken token)
        {
            var queueConsume = this.grainFactory.GetGrain<IStreamQueueConsume>(this.streamId, this.streamNamespace);
            if (!this.isBroadcast)
            {
                string messageKey = $"{token.SequenceNumber}_{token.EventIndex}";
                if (await queueConsume.IsCanConsumeMessageAsync(messageKey))
                {
                    try
                    {
                        await this.onNextAsync(message, token);
                    }
                    catch (Exception ex)
                    {
                        await queueConsume.ConsumeMessageErrorAsync(messageKey);
                        Console.WriteLine($"OnMessageAsync Error: {ex.Message}");
                        throw ex;
                    }
                }
            }
            else
            {
                await this.onNextAsync(message, token);
            }
        }

        /// <summary>
        /// 优雅的取消订阅
        /// </summary>
        /// <returns></returns>
        public async Task UnsubscribeAsync()
        {
            if (streamHandle != null)
            {
                await this.streamHandle.UnsubscribeAsync();
            }
        }

        public void Dispose()
        {
            Task.WaitAll(this.UnsubscribeAsync());
            this.streamHandle = null;
            this.stream = null;
        }
    }
}
